-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor PubSub acks to independent requests #37
Conversation
Previously pubsub acknowledgements were sent over the client stream of the streaming pull gRPC. This provided limited feedback for the ack/nack/modify callers, who had no (good) way to know their request was actually submitted. Furthermore there's evidence that some bug existed around connection resets and long ack times, although a definitive cause was not identified. This change uses explicit `acknowledge` and `modify_ack_deadline` rpc calls to submit acks instead of the client stream. These enable much clearer feedback for ack callers, as well as better backpressure regulation in general. This implementation was loosely inspired by the approach in the golang pubsub library[1]. [1]: https://github.com/googleapis/google-cloud-go/blob/94d040898cc9e85fdac76560765b01cfd019d0b4/pubsub/iterator.go#L422-L446
// now create the future to actually wait on the outcome | ||
async move { | ||
match send_result { | ||
Ok(()) => match listener.await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the oneshot::Receiver is used as a Future like here, does that mean it waits indefinitely? There's no need to timeout here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, it waits until there is a response or until the sender half is dropped. A user can add a timeout around ack()
in their application code if they'd like
Once this commit is landed, we also need to update hedwig-rust to change the ya-gcp dependency to 0.11.1, right? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
self.inner.clone(), | ||
self.inner.clone(), | ||
self.inner.clone(), | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙃
.drain(..) | ||
.map(|TokenFeedback { completion, .. }| completion); | ||
|
||
// peel off the first to avoid cloning in the common single-message case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to ask "isn't the response tiny anyway?", but tonic::Status
is surprisingly big...
let (acks, acks_rx) = mpsc::unbounded_channel(); | ||
let (nacks, nacks_rx) = mpsc::unbounded_channel(); | ||
let (modacks, modacks_rx) = mpsc::unbounded_channel(); | ||
let ack_router = Arc::new(AckRouter { acks, nacks, modacks }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since AckRouter
is just 3 senders, why not just clone it instead of putting it behind an Arc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah i went back and forth on this decision. The channels are basically Arc<InnerThing>
themselves, so it's effectively a question of
- Clone 1 Arc, have some indirection on ack/nack/modack, ack tokens hold 1 (additional) pointer
- Clone 3 Arcs, less indirection in ack/nack/modack, ack tokens hold 3 pointers
I went with 1 Arc on the premise that users may go through hundreds of thousands of messages and tokens, so making the tokens a little smaller and having fewer atomic inc/dec calls is a better trade. But the difference is minuscule either way
Because it's a minor version change, you can just update ya-gcp in the end application's lock file |
The changes in standard-ai#37 did not attempt retries on the explicit ack requests. These appear to fail in practice periodically (~1hr), perhaps due to some connection resets from the server. This change introduces retries to those requests
The changes in #37 did not attempt retries on the explicit ack requests. These appear to fail in practice periodically (~1hr), perhaps due to some connection resets from the server. This change introduces retries to those requests
Previously pubsub acknowledgements were sent over the client stream of the streaming pull gRPC. This provided limited feedback for the ack/nack/modify callers, who had no (good) way to know their request was actually submitted. Furthermore there's evidence that some bug existed around connection resets and long ack times, although a definitive cause was not identified.
This change uses explicit
acknowledge
andmodify_ack_deadline
rpc calls to submit acks instead of the client stream. These enable much clearer feedback for ack callers, as well as better backpressure regulation in general. This implementation was loosely inspired by the approach in the golang pubsub library1.